/*
 * Copyright (C) 2011-2012 Matteo Landi, Luigi Rizzo. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *   1. Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *   2. Redistributions in binary form must reproduce the above copyright
 *      notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */

/*
 * $FreeBSD: head/tools/tools/netmap/pkt-gen.c 231198 2012-02-08 11:43:29Z luigi $
 * $Id: netmap.cpp,v 1.1.2.3 2013/02/20 01:43:40 chenht Exp $
 *
 * Example program to show how to build a multithreaded packet
 * source/sink using the netmap device.
 *
 * In this example we create a programmable number of threads
 * to take care of all the queues of the interface used to
 * send or receive traffic.
 *
 */

#include "nm_util.h"
extern void process_pkt(void *worker, u_char *buf,int len,int direct);
extern void *worker_create(const char *ifname, int thrid);

int verbose = 0;

struct pkt {
	struct ether_header eh;
	struct ip ip;
	struct udphdr udp;
	uint8_t body[2048];	// XXX hardwired
} __attribute__((__packed__));

/*
 * global arguments for all threads
 */

struct glob_arg {
	int pkt_size;
	int npackets;	/* total packets to send */
	int nthreads;
	int cpus;
};


struct targ {
        struct glob_arg *g;
        int used;
        int completed;
        int fd;
        struct nmreq nmr;
        struct netmap_if *nifp;
        uint16_t        qfirst, qlast; /* range of queues to scan */
        uint64_t count;
        struct timeval tic, toc;
        int me;
        pthread_t thread;
        int affinity;

        struct pkt pkt;
};

static struct targ *targs;
static int global_nthreads;
static int pno=0;
extern pthread_key_t pthkey;
/*sysctl wrapper to return the number of active CPUs */
extern void *ph_sendto(void *p);
extern int  add_card_thread_map(pthread_t tid,char *dev);
static int
system_ncpus(void)
{
#ifdef __FreeBSD__
	int mib[2], ncpus;
	size_t len;

	mib[0] = CTL_HW;
	mib[1] = HW_NCPU;
	len = sizeof(mib);
	sysctl(mib, 2, &ncpus, &len, NULL, 0);

	return (ncpus);
#else
	return 1;
#endif /* !__FreeBSD__ */
}

/* set the thread affinity. */
static int
setaffinity(pthread_t me, int i)
{
#ifdef __FreeBSD__
	cpuset_t cpumask;

	if (i == -1)
		return 0;

	/* Set thread affinity affinity.*/
	CPU_ZERO(&cpumask);
	CPU_SET(i, &cpumask);

	if (pthread_setaffinity_np(me, sizeof(cpuset_t), &cpumask) != 0) {
		D("Unable to set affinity");
		return 1;
	}
#else
	(void)me; /* suppress 'unused' warnings */
	(void)i;
#endif /* __FreeBSD__ */
	return 0;
}

static int
receive_packets(void *worker, struct netmap_ring *ring, u_int limit)
{
	u_int cur, rx;

	cur = ring->cur;
	if (ring->avail < limit)
		limit = ring->avail;
	for (rx = 0; rx < limit; rx++) {
		struct netmap_slot *slot = &ring->slot[cur];
		char *p = NETMAP_BUF(ring, slot->buf_idx);
		 process_pkt(worker, (u_char *)p, slot->len,2);
		cur = NETMAP_RING_NEXT(ring, cur);
	}
	ring->avail -= rx;
	ring->cur = cur;

	return (rx);
}

static void *
receiver_body(void *data)
{
	struct targ *targ = (struct targ *) data;
	struct pollfd fds[1];
	struct netmap_if *nifp = targ->nifp;
	struct netmap_ring *rxring;
	int i, received = 0;
	int prino;

	void *worker = worker_create(targ->nmr.nr_name, targ->me);
	
	if (setaffinity(targ->thread, targ->affinity))
		goto quit;
	prino=pno;
	pthread_setspecific(pthkey,&prino);
	/* setup poll(2) mechanism. */
	memset(fds, 0, sizeof(fds));
	fds[0].fd = targ->fd;
	fds[0].events = (POLLIN);
	sleep(5);
	/* unbounded wait for the first packet. */
	for (;;) {
		i = poll(fds, 1, 1000);
		if (i > 0 && !(fds[0].revents & POLLERR))
			break;
//		D("waiting for initial packets, poll returns %d %d", i, fds[0].revents);
	}

	/* main loop, exit after 1s silence */
	gettimeofday(&targ->tic, NULL);
	while (1) {
		int ret;
		/* Once we started to receive packets, wait at most 1 seconds
		   before quitting. */
		ret=poll(fds, 1, 1 * 1000);
		if (ret < 0) {
			gettimeofday(&targ->toc, NULL);
			targ->toc.tv_sec -= 1; /* Subtract timeout time. */
			break;
		}
		if (ret < 0) {
			continue;
		}

		for (i = targ->qfirst; i < targ->qlast; i++) {
			int m;

			rxring = NETMAP_RXRING(nifp, i);
			if (rxring->avail == 0)
				continue;

			m = receive_packets(worker, rxring, 100);
			received += m;
			targ->count = received;
		}

    }

	targ->completed = 1;
	targ->count = received;

quit:
	/* reset the ``used`` flag. */
	targ->used = 0;

	return (NULL);
}

struct sf {
	char *key;
	void *f;
};

int
start_netmap_pthread(char *ifname)
{
	int i, fd;
	struct glob_arg g;

	struct nmreq nmr;
	void *mmap_addr;		/* the mmap address */
	void *(*td_body)(void *) = receiver_body;
	int devqueues = 1;	/* how many device queues */
	int affinity = -1;

	bzero(&g, sizeof(g));

	g.nthreads = 8;
	g.cpus = system_ncpus();

	bzero(&nmr, sizeof(nmr));
	nmr.nr_version = NETMAP_API;
	/*
	 * Open the netmap device to fetch the number of queues of our
	 * interface.
	 *
	 * The first NIOCREGIF also detaches the card from the
	 * protocol stack and may cause a reset of the card,
	 * which in turn may take some time for the PHY to
	 * reconfigure.
	 */
	fd = open("/dev/netmap", O_RDWR);
	if (fd == -1) {
		D("Unable to open /dev/netmap");
		// fail later
	} else {
		if ((ioctl(fd, NIOCGINFO, &nmr)) == -1) {
			D("Unable to get if info without name");
		} else {
			D("map size is %d Kb", nmr.nr_memsize >> 10);
		}
		bzero(&nmr, sizeof(nmr));
		nmr.nr_version = NETMAP_API;
		strncpy(nmr.nr_name, ifname, sizeof(nmr.nr_name));
		if ((ioctl(fd, NIOCGINFO, &nmr)) == -1) {
            printf("%d\n", errno);
			D("Unable to get if info for %s", ifname);
		}
		D("INFO %s", ifname);
		D("nr_tx_slots is %d", nmr.nr_tx_slots);
		D("nr_rx_slots is %d", nmr.nr_rx_slots);
		D("nr_tx_rings is %d", nmr.nr_tx_rings);
		D("nr_rx_rings is %d", nmr.nr_rx_rings);
		devqueues = nmr.nr_rx_rings;
	}

	g.nthreads = devqueues;
	/*
	 * Map the netmap shared memory: instead of issuing mmap()
	 * inside the body of the threads, we prefer to keep this
	 * operation here to simplify the thread logic.
	 */
	D("mmapping %d Kbytes", nmr.nr_memsize>>10);
	mmap_addr = (struct netmap_d *) mmap(0, nmr.nr_memsize,
					    PROT_WRITE | PROT_READ,
					    MAP_SHARED, fd, 0);
	if (mmap_addr == MAP_FAILED) {
		D("Unable to mmap %d KB", nmr.nr_memsize >> 10);
		// continue, fail later
	}

	/*
	 * Register the interface on the netmap device: from now on,
	 * we can operate on the network interface without any
	 * interference from the legacy network stack.
	 *
	 * We decide to put the first interface registration here to
	 * give time to cards that take a long time to reset the PHY.
	 */
	nmr.nr_version = NETMAP_API;
	if (ioctl(fd, NIOCREGIF, &nmr) == -1) {
		D("Unable to register interface %s", ifname);
		//continue, fail later
	}

	sleep(5);
	/* Print some debug information. */
	fprintf(stdout,
		"%s %s: %d queues, %d threads.\n",
		"Receiving from",
		ifname,
		devqueues,
		g.nthreads);
			
	/* Exit if something went wrong. */
	if (fd < 0) {
		D("aborting");
	}

	/* Install ^C handler. */
	global_nthreads = g.nthreads;

	targs = (struct targ *)calloc(g.nthreads, sizeof(*targs));
	/*
	 * Now create the desired number of threads, each one
	 * using a single descriptor.
 	 */
	for (i = 0; i < g.nthreads; i++) {
		struct netmap_if *tnifp;
		struct nmreq tifreq;
		int tfd;

		/* register interface. */
		tfd = open("/dev/netmap", O_RDWR);
		if (tfd == -1) {
			D("Unable to open /dev/netmap");
			continue;
		}

		bzero(&tifreq, sizeof(tifreq));
		strncpy(tifreq.nr_name, ifname, sizeof(tifreq.nr_name));
		tifreq.nr_version = NETMAP_API;
		tifreq.nr_ringid = (g.nthreads > 1) ? (i | NETMAP_HW_RING) : 0;

		/*
		 * if we are acting as a receiver only, do not touch the transmit ring.
		 * This is not the default because many apps may use the interface
		 * in both directions, but a pure receiver does not.
		 */
		if (td_body == receiver_body) {
			tifreq.nr_ringid |= NETMAP_NO_TX_POLL;
		}

		if ((ioctl(tfd, NIOCREGIF, &tifreq)) == -1) {
			D("Unable to register %s", ifname);
			continue;
		}
		tnifp = NETMAP_IF(mmap_addr, tifreq.nr_offset);
		/* start threads. */
		bzero(&targs[i], sizeof(targs[i]));
		targs[i].g = &g;
		targs[i].used = 1;
		targs[i].completed = 0;
		targs[i].fd = tfd;
		targs[i].nmr = tifreq;
		targs[i].nifp = tnifp;
		targs[i].qfirst = (g.nthreads > 1) ? i : 0;
		targs[i].qlast = (g.nthreads > 1) ? i+1 :
			(td_body == receiver_body ? tifreq.nr_rx_rings : tifreq.nr_tx_rings);
		targs[i].me = i;
		if (affinity >= 0) {
			if (affinity < g.cpus)
				targs[i].affinity = affinity;
			else
				targs[i].affinity = i % g.cpus;
		} else
			targs[i].affinity = -1;

		if (pthread_create(&targs[i].thread, NULL, td_body,
				   &targs[i]) == -1) {
			D("Unable to create thread %d", i);
			targs[i].used = 0;
		}
		sleep(1);
		pno++;
		add_card_thread_map(targs[i].thread,ifname);
	}
	return 0;

    {
	uint64_t my_count = 0, prev = 0;
	uint64_t count = 0;
	double delta_t;
	struct timeval tic, toc;

	gettimeofday(&toc, NULL);
	for (;;) {
		struct timeval now, delta;
		uint64_t pps;
		int done = 0;

		delta.tv_sec = 1;
		delta.tv_usec = 0;
		select(0, NULL, NULL, NULL, &delta);
		gettimeofday(&now, NULL);
		timersub(&now, &toc, &toc);
		my_count = 0;
		for (i = 0; i < g.nthreads; i++) {
			my_count += targs[i].count;
			if (targs[i].used == 0)
				done++;
		}
		pps = toc.tv_sec* 1000000 + toc.tv_usec;
		if (pps < 10000)
			continue;
		pps = (my_count - prev)*1000000 / pps;
		prev = my_count;
		toc = now;
		if (done == g.nthreads)
			break;
	}

	timerclear(&tic);
	timerclear(&toc);
	for (i = 0; i < g.nthreads; i++) {
		/*
		 * Join active threads, unregister interfaces and close
		 * file descriptors.
		 */
		pthread_join(targs[i].thread, NULL);
		ioctl(targs[i].fd, NIOCUNREGIF, &targs[i].nmr);
		close(targs[i].fd);

		if (targs[i].completed == 0)
			continue;

		/*
		 * Collect threads output and extract information about
		 * how long it took to send all the packets.
		 */
		count += targs[i].count;
		if (!timerisset(&tic) || timercmp(&targs[i].tic, &tic, <))
			tic = targs[i].tic;
		if (!timerisset(&toc) || timercmp(&targs[i].toc, &toc, >))
			toc = targs[i].toc;
	}

	/* print output. */
	timersub(&toc, &tic, &toc);
	delta_t = toc.tv_sec + 1e-6* toc.tv_usec;
    }

	ioctl(fd, NIOCUNREGIF, &nmr);
	munmap(mmap_addr, nmr.nr_memsize);
	close(fd);

	return (0);
}
/* end of file */
